09. ForkJoin Pools
ForkJoin Pools
In this section, you'll learn some of the advantages of ForkJoinPool
s, and how to use them.
ND079 JPND C2 L05 A09 ForkJoin Pools
What is a ForkJoinPool
?
ForkJoinPool
is a specialized kind of thread pool that has the following advantages over traditional thread pools:
- It uses a technique called work stealing so that idle worker threads can find work to do.
- Its API is optimized for asynchronous work that creates more work. You might also hear this called recursive work.
In practice, work stealing does not have a huge impact on performance because "traditional" thread pools do a fine job of distributing work across the worker threads. However, depending on the kind of asynchronous tasks your program creates, work stealing may give an extra efficiency boost.
ForkJoinTask
s
When you create work to submit to a ForkJoinPool
, you usually do so by subclassing either RecursiveTask
or RecursiveAction
.
Use RecursiveTask
for asynchronous work that returns a value, and use RecursiveAction
when the asynchronous computation does not return a value.
The ForkJoinPool
API is optimized for recursive work, which is work that creates other work.
When you are implementing the compute()
method of a RecursiveAction
or RecursiveTask
, you can submit more work to the thread pool by calling the invoke()
) method, or one of its many variants. Once you invoke the recursive work, your RecursiveAction
or RecursiveTask
can wait for the results and use them in its own computation, or it can proceed without joining the results.
You can also use the "normal" thread pool methods of submit()
and execute()
.
ForkJoinPool
Demo
ND079 JPND C2 L05 A10 Demo ForkJoin Pools
In this demo, we took a recursive, sequential algorithm, and parallelized the work using a ForkJoinPool
.
Code from the Demo
CountWordsTask.java
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public final class CountWordsTask extends RecursiveTask<Long> {
private final Path path;
private final String word;
public CountWordsTask(Path path, String word) {
this.path = path;
this.word = word;
}
@Override
protected Long compute() {
if (!Files.isDirectory(path)) {
return WordCounter.countWordInFile(path, word);
}
Stream<Path> subpaths;
try {
subpaths = Files.list(path);
} catch (IOException e) {
return 0L;
}
List<CountWordsTask> subtasks =
subpaths.map(path -> new CountWordsTask(path, word))
.collect(Collectors.toList());
invokeAll(subtasks);
return subtasks
.stream()
.mapToLong(CountWordsTask::getRawResult)
.sum();
}
}
WordCounter.java
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
public final class WordCounter {
public static void main(String[] args) {
if (args.length != 2) {
System.out.println("Usage: WordCounter [path] [word]");
return;
}
Path start = Path.of(args[0]);
String word = args[1];
Instant before = Instant.now();
ForkJoinPool pool = new ForkJoinPool();
long count = pool.invoke(new CountWordsTask(start, word));
Duration elapsed = Duration.between(before, Instant.now());
System.out.println(count + " (" + elapsed.toSeconds() + " seconds)");
}
public static long countWordInFile(Path file, String word) {
try {
return Files.readAllLines(file, StandardCharsets.UTF_8)
.stream()
.flatMap(l -> Arrays.stream(l.split(" ")))
.filter(word::equalsIgnoreCase)
.count();
} catch (IOException e) {
return 0;
}
}
private static long countWords(Path path, String word) {
if (!Files.isDirectory(path)) {
return countWordInFile(path, word);
}
try {
return Files.list(path)
.mapToLong(p -> countWords(p, word))
.sum();
} catch (IOException e) {
return 0;
}
}
}
SOLUTION:
- Work stealing can reduce the number of idle worker threads.
- The ForkJoin API is optimized for recursive work tasks.